iT邦幫忙

2025 iThome 鐵人賽

DAY 29
0

AI/ML 異常檢測功能概覽

Graylog Security 6.3 內建異常檢測模組,利用 OpenSearch 的機器學習引擎對日誌數據進行行為基準分析與異常偵測,具備以下特性:

  • 自動化模型訓練:根據歷史日誌自動建立行為基準線
  • 多維度特徵指標:同時分析事件頻率、來源、時間分佈等多種維度
  • 可調整敏感度:low/medium/high 三種檢測靈敏度
  • 可視化監控:專屬儀表板顯示檢測結果與模型狀態

核心架構組成

  1. 數據源層(Indices)
    • 日誌事件以索引形式存儲於 OpenSearch
    • 支援多組索引(如 graylog_*, application_logs)
  2. ML 引擎層(Anomaly Detection Plugin)
    • 建立並訓練異常檢測模型
    • 定期執行檢測任務,輸出異常分數與警報
  3. 控制與管理層(Web UI)
    • 透過「System → Anomaly Detection」頁面啟用、設定及監控
    • 提供模型運行狀態、檢測日誌、誤報統計等資訊
  4. 警報與通知層(Alerts / Streams / Dashboards)
    • 異常分數超過門檻時觸發警報
    • 可結合 Streams Route 將異常事件推送至郵件、Slack 或 Webhook

核心概念解析

行為基準線(Baseline)

系統在啟用後,根據「訓練期間(Training Period)」內的正常流量建立行為基準。此基準含多維度統計(如事件總量、平均頻率、峰值時間),使後續異常偵測更具上下文意義。

檢測敏感度(Sensitivity)

  • Low:寬鬆門檻,僅檢測明顯異常;適用於高噪音環境
  • Medium:平衡靈敏度與誤報率,2025 年預設值
  • High:嚴格門檻,適用於對安全需求最高的場景

檢測週期(Detection Interval)

系統以固定秒數為間隔執行批次檢測,典型值為 300 秒。間隔越短,偵測越即時,但對資源需求更高。

多維特徵選取(Feature Attributes)

可針對不同索引與應用場景選擇關鍵欄位,如:

  • 使用者 ID(user_id)
  • 來源 IP(src_addr)
  • 請求路徑(http_path)

這些欄位將作為特徵變數餵入 ML 模型,偵測異常偏離。

運行與監控

啟用與管理

在 Web UI「System → Anomaly Detection」頁面:

  • 點擊 Enable 啟用功能
  • 設定 Training PeriodSensitivityDetection Interval
  • 查看 Model Status 以確認訓練完成度

結果呈現

  • Anomalies View:列出最新偵測到的異常事件
  • Metrics Dashboard:監控模型訓練進度、異常分數分佈與誤報率

Pipeline 規則範例

以下範例示範如何根據異常分數(anomaly_score)將事件路由至專用 Stream,以便 Alert 或後續處理。先在 Graylog UI 中建立一個名為「Anomaly_Stream」的 Stream。

rule "Route events by anomaly score"
when
  has_field("anomaly_score") && 
  to_double($message.anomaly_score) > 75.0
then
  route_to_stream("Anomaly_Stream");
  set_field("anomaly_detected", true);
end
  • has_field("anomaly_score"):檢查事件是否包含異常分數欄位
  • to_double(...) > 75.0:設定分數門檻為 75
  • route_to_stream("Anomaly_Stream"):將符合條件的事件推送至該 Stream
  • set_field("anomaly_detected", true):標記事件為已偵測異常

Python API 範例

以下以 Python 3.11+ 說明如何透過 Graylog REST API 建立、查詢與刪除自定義異常檢測器(Detector)。

import requests

GRAYLOG_URL = "http://<graylog-ip>:9000"
API_TOKEN   = "YOUR_API_TOKEN"

headers = {
    "Authorization": f"Bearer {API_TOKEN}",
    "Content-Type": "application/json"
}

def create_detector(name: str, index_pattern: str) -> dict:
    payload = {
        "name": name,
        "description": "自動化異常檢測器",
        "time_field": "@timestamp",
        "indices": [index_pattern],
        "feature_attributes": [
            {"type": "count", "field": "message"},
            {"type": "mean",  "field": "duration_ms"}
        ],
        "detection_interval": {"period": {"interval": 5, "unit": "MINUTES"}},
        "window_delay":        {"period": {"interval": 1, "unit": "MINUTES"}}
    }
    resp = requests.post(
        f"{GRAYLOG_URL}/api/plugins/anomaly_detection/detectors",
        headers=headers, json=payload
    )
    return resp.json()

def list_detectors() -> dict:
    resp = requests.get(
        f"{GRAYLOG_URL}/api/plugins/anomaly_detection/detectors",
        headers=headers
    )
    return resp.json()

def delete_detector(detector_id: str) -> None:
    requests.delete(
        f"{GRAYLOG_URL}/api/plugins/anomaly_detection/detectors/{detector_id}",
        headers=headers
    )

if __name__ == "__main__":
    # 建立一個新的偵測器
    new = create_detector("ExampleDetector", "graylog_*")
    print("Created Detector ID:", new.get("id"))

    # 列出所有偵測器
    all_detectors = list_detectors()
    print("Detectors List:", all_detectors)

    # 刪除示範用偵測器
    example_id = new.get("id")
    if example_id:
        delete_detector(example_id)
        print(f"Deleted Detector ID: {example_id}")
  • feature_attributes:定義要監控的特徵欄位及統計方式
  • detection_intervalwindow_delay:控制偵測執行與遲延時間
  • 使用 Bearer Token 驗證,確保 API 權限

上一篇
Day 28: Docker 環境下 Graylog 版本升級實戰
下一篇
Day 30: 總結
系列文
從零開始的 graylog 探險30
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言